27.5 多智能体协作

19 分钟阅读

多智能体协作概述#

多智能体协作是指多个AI智能体协同工作,共同完成复杂任务的能力。通过协作,智能体可以共享信息、分工合作、互相补充,从而提高整体效率和效果。

多智能体协作的基本概念#

1. 什么是多智能体协作#

多智能体协作是指多个AI智能体通过通信、协调和合作,共同完成单个智能体难以完成的复杂任务。

多智能体协作的特点:

  • 分布式决策: 每个智能体独立决策
  • 信息共享: 智能体之间共享信息
  • 任务分工: 智能体分工合作
  • 协同优化: 整体优化而非局部优化

2. 协作模式#

协作模式特点适用场景
主从模式一个主智能体协调多个从智能体层次化任务
平等模式智能体地位平等,共同决策协作任务
竞争模式智能体竞争完成任务优化问题
混合模式结合多种模式复杂场景

智能体通信机制#

1. 消息传递#

示例:消息传递机制

用户请求: "实现智能体之间的消息传递机制"

Claude Code 生成的代码:

python
````python from typing import Dict, List, Any, Optional, Callable from datetime import datetime from enum import Enum import logging import asyncio from dataclasses import dataclass, field logger = logging.getLogger(__name__) class MessageType(Enum): """消息类型""" REQUEST = "request" RESPONSE = "response" NOTIFICATION = "notification" BROADCAST = "broadcast" @dataclass class Message: """消息""" id: str type: MessageType sender: str receiver: str content: Dict[str, Any] timestamp: datetime = field(default_factory=datetime.utcnow) reply_to: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) class MessageBus: """消息总线""" def __init__(self): self.agents: Dict[str, 'Agent'] = {} self.message_queue: asyncio.Queue = asyncio.Queue() self.message_handlers: Dict[str, Callable] = {} self.running = False def register_agent(self, agent: 'Agent'): """注册智能体""" self.agents[agent.id] = agent logger.info(f"Agent registered: {agent.id}") def unregister_agent(self, agent_id: str): """注销智能体""" if agent_id in self.agents: del self.agents[agent_id] logger.info(f"Agent unregistered: {agent_id}") async def send_message(self, message: Message): """发送消息""" receiver = self.agents.get(message.receiver) if not receiver: logger.warning(f"Receiver not found: {message.receiver}") return # 添加到消息队列 await self.message_queue.put(message) logger.info(f"Message sent from {message.sender} to {message.receiver}") async def broadcast_message(self, message: Message): """广播消息""" for agent_id, agent in self.agents.items(): if agent_id != message.sender: broadcast_msg = Message( id=f"{message.id}_to_{agent_id}", type=message.type, sender=message.sender, receiver=agent_id, content=message.content, timestamp=message.timestamp, metadata=message.metadata ) await self.send_message(broadcast_msg) logger.info(f"Message broadcasted from {message.sender}") def register_handler(self, message_type: str, handler: Callable): """注册消息处理器""" self.message_handlers[message_type] = handler logger.info(f"Handler registered for message type: {message_type}") async def start(self): """启动消息总线""" self.running = True logger.info("Message bus started") # 启动消息处理循环 asyncio.create_task(self._process_messages()) async def stop(self): """停止消息总线""" self.running = False logger.info("Message bus stopped") async def _process_messages(self): """处理消息""" while self.running: try: # 获取消息 message = await asyncio.wait_for( self.message_queue.get(), timeout=1.0 ) # 传递给接收者 receiver = self.agents.get(message.receiver) if receiver: await receiver.receive_message(message) except asyncio.TimeoutError: continue except Exception as e: logger.error(f"Error processing message: {e}") class Agent: """智能体基类""" def __init__(self, agent_id: str, message_bus: MessageBus): self.id = agent_id self.message_bus = message_bus self.message_handlers: Dict[str, Callable] = {} self.knowledge_base: Dict[str, Any] = {} async def receive_message(self, message: Message): """接收消息""" logger.info(f"Agent {self.id} received message from {message.sender}") # 查找处理器 handler = self.message_handlers.get(message.type.value) if handler: # 调用处理器 response = await handler(message) # 如果是请求消息,发送响应 if message.type == MessageType.REQUEST: response_message = Message( id=f"response_{message.id}", type=MessageType.RESPONSE, sender=self.id, receiver=message.sender, content=response, reply_to=message.id ) await self.message_bus.send_message(response_message) else: logger.warning(f"No handler for message type: {message.type.value}") def register_handler(self, message_type: str, handler: Callable): """注册消息处理器""" self.message_handlers[message_type] = handler logger.info(f"Agent {self.id} registered handler for {message_type}") async def send_request(self, receiver_id: str, content: Dict[str, Any]) -> Message: """发送请求""" message = Message( id=f"req_{datetime.utcnow().timestamp()}", type=MessageType.REQUEST, sender=self.id, receiver=receiver_id, content=content ) await self.message_bus.send_message(message) return message async def send_response(self, original_message: Message, content: Dict[str, Any]): """发送响应""" message = Message( id=f"resp_{datetime.utcnow().timestamp()}", type=MessageType.RESPONSE, sender=self.id, receiver=original_message.sender, content=content, reply_to=original_message.id ) await self.message_bus.send_message(message) async def send_notification(self, receiver_id: str, content: Dict[str, Any]): """发送通知""" message = Message(

id=f"notif_{datetime.utcnow().timestamp()}", type=MessageType.NOTIFICATION, sender=self.id, receiver=receiver_id, content=content )

await self.message_bus.send_message(message)

async def broadcast(self, content: Dict[str, Any]): """广播消息""" message = Message( id=f"broadcast_{datetime.utcnow().timestamp()}", type=MessageType.BROADCAST, sender=self.id, receiver="all", content=content )

await self.message_bus.broadcast_message(message)

def update_knowledge(self, key: str, value: Any): """更新知识库""" self.knowledge_base[key] = value logger.info(f"Agent {self.id} updated knowledge: {key}")

def get_knowledge(self, key: str) -> Optional[Any]: """获取知识""" return self.knowledge_base.get(key)

使用示例

async def main(): """主函数"""

创建消息总线

message_bus = MessageBus() await message_bus.start()

创建智能体

agent1 = Agent("agent1", message_bus) agent2 = Agent("agent2", message_bus) agent3 = Agent("agent3", message_bus)

注册智能体

message_bus.register_agent(agent1) message_bus.register_agent(agent2) message_bus.register_agent(agent3)

注册消息处理器

async def handle_request(message: Message) -> Dict[str, Any]: """处理请求""" print(f"Agent {agent1.id} handling request from {message.sender}") return {'status': 'success', 'data': 'processed'}

agent1.register_handler('request', handle_request)

async def handle_notification(message: Message) -> Dict[str, Any]: """处理通知""" print(f"Agent {agent2.id} received notification from {message.sender}") return {'status': 'acknowledged'}

agent2.register_handler('notification', handle_notification)

发送消息

await agent1.send_request('agent2', {'task': 'process_data'}) await agent2.send_notification('agent3', {'event': 'update'}) await agent3.broadcast({'announcement': 'system_ready'})

等待消息处理

await asyncio.sleep(1)

停止消息总线

await message_bus.stop()

if name == 'main': asyncio.run(main())

bash
> - 异步消息传递
> - 支持点对点和广播
> - 消息类型分类
> - 请求-响应模式

2. 共享知识库#

示例:共享知识库

用户请求: "实现智能体之间的共享知识库" Claude Code 生成的代码:

python
`python from typing import Dict, List, Any, Optional from datetime import datetime import logging import json logger = logging.getLogger(__name__) class SharedKnowledgeBase: """共享知识库""" def __init__(self): self.knowledge: Dict[str, Any] = {} self.access_log: List[Dict[str, Any]] = [] self.version: int = 0 def add_knowledge(self, key: str, value: Any, agent_id: str) -> bool: """添加知识""" if key in self.knowledge: logger.warning(f"Knowledge key already exists: {key}") return False self.knowledge[key] = { 'value': value, 'agent_id': agent_id, 'created_at': datetime.utcnow(), 'version': self.version } self.version += 1 self._log_access(agent_id, 'add', key) logger.info(f"Knowledge added: {key} by {agent_id}") return True def update_knowledge(self, key: str, value: Any, agent_id: str) -> bool: """更新知识""" if key not in self.knowledge: logger.warning(f"Knowledge key not found: {key}") return False old_value = self.knowledge[key]['value'] self.knowledge[key]['value'] = value self.knowledge[key]['updated_at'] = datetime.utcnow() self.knowledge[key]['updated_by'] = agent_id self.knowledge[key]['version'] = self.version self.knowledge[key]['old_value'] = old_value self.version += 1 self._log_access(agent_id, 'update', key) logger.info(f"Knowledge updated: {key} by {agent_id}") return True def get_knowledge(self, key: str, agent_id: str) -> Optional[Any]: """获取知识""" if key not in self.knowledge: logger.warning(f"Knowledge key not found: {key}") return None self._log_access(agent_id, 'read', key) return self.knowledge[key]['value'] def delete_knowledge(self, key: str, agent_id: str) -> bool: """删除知识""" if key not in self.knowledge: logger.warning(f"Knowledge key not found: {key}") return False del self.knowledge[key] self._log_access(agent_id, 'delete', key) logger.info(f"Knowledge deleted: {key} by {agent_id}") return True def search_knowledge(self, query: str, agent_id: str) -> List[Dict[str, Any]]: """搜索知识""" results = [] query_lower = query.lower() for key, knowledge in self.knowledge.items(): if query_lower in key.lower(): results.append({ 'key': key, 'value': knowledge['value'], 'agent_id': knowledge['agent_id'], 'version': knowledge['version'] }) self._log_access(agent_id, 'search', query) logger.info(f"Knowledge search: {query} by {agent_id}, found {len(results)} results") return results def get_all_knowledge(self, agent_id: str) -> Dict[str, Any]: """获取所有知识""" self._log_access(agent_id, 'read_all', 'all') return {key: knowledge['value'] for key, knowledge in self.knowledge.items()} def get_knowledge_by_agent(self, agent_id: str) -> Dict[str, Any]: """获取智能体的知识""" agent_knowledge = {} for key, knowledge in self.knowledge.items(): if knowledge['agent_id'] == agent_id: agent_knowledge[key] = knowledge['value'] return agent_knowledge def merge_knowledge(self, other_knowledge: Dict[str, Any], agent_id: str) -> int: """合并知识""" merged_count = 0 for key, value in other_knowledge.items(): if key not in self.knowledge: self.add_knowledge(key, value, agent_id) merged_count += 1 logger.info(f"Knowledge merged: {merged_count} items by {agent_id}") return merged_count def _log_access(self, agent_id: str, action: str, key: str): """记录访问日志""" self.access_log.append({ 'agent_id': agent_id, 'action': action, 'key': key, 'timestamp': datetime.utcnow() }) def get_access_log(self, agent_id: Optional[str] = None) -> List[Dict[str, Any]]: """获取访问日志""" if agent_id: return [log for log in self.access_log if log['agent_id'] == agent_id] return self.access_log def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" agent_counts = {} for knowledge in self.knowledge.values(): agent_id = knowledge['agent_id'] agent_counts[agent_id] = agent_counts.get(agent_id, 0) + 1 return { 'total_knowledge': len(self.knowledge), 'version': self.version, 'agent_counts': agent_counts, 'total_accesses': len(self.access_log) } class KnowledgeSharingAgent(Agent): """知识共享智能体""" def __init__(self, agent_id: str, message_bus: MessageBus, knowledge_base: SharedKnowledgeBase): super().__init__(agent_id, message_bus) self.knowledge_base = knowledge_base async def share_knowledge(self, key: str, value: Any): """分享知识""" success = self.knowledge_base.add_knowledge(key, value, self.id) if success: # 广播知识更新 await self.broadcast({ 'type': 'knowledge_update', 'key': key, 'agent_id': self.id }) async def request_knowledge(self, key: str, target_agent_id: str) -> Optional[Any]: """请求知识""" # 发送知识请求 await self.send_request(target_agent_id, { 'type': 'knowledge_request', 'key': key }) # 等待响应 await asyncio.sleep(0.5) # 从知识库获取 return self.knowledge_base.get_knowledge(key, self.id) async def sync_knowledge(self): """同步知识""" # 获取其他智能体的知识 other_agents = [aid for aid in self.message_bus.agents.keys() if aid != self.id] for agent_id in other_agents: await self.send_request(agent_id, { 'type': 'knowledge_sync_request' }) # 使用示例 async def main(): """主函数""" # 创建消息总线 message_bus = MessageBus() await message_bus.start() # 创建共享知识库 knowledge_base = SharedKnowledgeBase() # 创建智能体 agent1 = KnowledgeSharingAgent("agent1", message_bus, knowledge_base) agent2 = KnowledgeSharingAgent("agent2", message_bus, knowledge_base) agent3 = KnowledgeSharingAgent("agent3", message_bus, knowledge_base) # 注册智能体 message_bus.register_agent(agent1) message_bus.register_agent(agent2) message_bus.register_agent(agent3) # 分享知识 await agent1.share_knowledge("python_syntax", "Python uses indentation for code blocks") await agent2.share_knowledge("javascript_syntax", "JavaScript uses curly braces for code blocks") await agent3.share_knowledge("java_syntax", "Java uses curly braces and semicolons") # 请求知识 python_syntax = await agent2.request_knowledge("python_syntax", "agent1") print(f"Python syntax: {python_syntax}") # 获取统计信息 stats = knowledge_base.get_statistics() print(f"Knowledge base statistics: {stats}") # 停止消息总线 await message_bus.stop() if __name__ == '__main__': asyncio.run(main()) ```> **共享知识库特点**: > - 集中式知识存储 > - 访问控制和日志 > - 知识搜索和检索 > - 智能体间知识同步 ``` ## 协作任务执行 ### 1. 任务分配 # 示例:任务分配 用户请求: "实现智能体之间的任务分配机制" Claude Code 生成的代码: ````python `python from typing import Dict, List, Any, Optional from datetime import datetime import logging import asyncio logger = logging.getLogger(__name__) class Task: """任务""" def __init__(self, task_id: str, name: str, description: str, requirements: List[str], estimated_duration: float): self.id = task_id self.name = name self.description = description self.requirements = requirements self.estimated_duration = estimated_duration self.assigned_to: Optional[str] = None self.status = "pending" self.created_at = datetime.utcnow() self.started_at: Optional[datetime] = None self.completed_at: Optional[datetime] = None self.result: Optional[Any] = None class TaskAllocator: """任务分配器""" def __init__(self): self.tasks: Dict[str, Task] = {} self.agents: Dict[str, Dict[str, Any]] = {} self.agent_workload: Dict[str, float] = {} def register_agent(self, agent_id: str, capabilities: List[str], max_workload: float = 1.0): """注册智能体""" self.agents[agent_id] = { 'capabilities': capabilities, 'max_workload': max_workload, 'current_workload': 0.0 } self.agent_workload[agent_id] = 0.0 logger.info(f"Agent registered: {agent_id} with capabilities {capabilities}") def add_task(self, task: Task): """添加任务""" self.tasks[task.id] = task logger.info(f"Task added: {task.id}") def allocate_task(self, task_id: str) -> Optional[str]: """分配任务""" task = self.tasks.get(task_id) if not task: logger.warning(f"Task not found: {task_id}") return None # 查找合适的智能体 suitable_agents = self._find_suitable_agents(task) if not suitable_agents: logger.warning(f"No suitable agents for task: {task_id}") return None # 选择负载最低的智能体 selected_agent = self._select_least_loaded_agent(suitable_agents) # 分配任务 task.assigned_to = selected_agent task.status = "assigned" self.agents[selected_agent]['current_workload'] += task.estimated_duration self.agent_workload[selected_agent] = self.agents[selected_agent]['current_workload'] logger.info(f"Task {task_id} assigned to {selected_agent}") return selected_agent def _find_suitable_agents(self, task: Task) -> List[str]: """查找合适的智能体""" suitable_agents = [] for agent_id, agent_info in self.agents.items(): # 检查能力匹配 capabilities_match = all( req in agent_info['capabilities'] for req in task.requirements ) # 检查负载 has_capacity = ( agent_info['current_workload'] + task.estimated_duration <= agent_info['max_workload'] ) if capabilities_match and has_capacity: suitable_agents.append(agent_id) return suitable_agents def _select_least_loaded_agent(self, agents: List[str]) -> str: """选择负载最低的智能体""" return min(agents, key=lambda aid: self.agent_workload[aid]) def complete_task(self, task_id: str, result: Any): """完成任务""" task = self.tasks.get(task_id) if not task: logger.warning(f"Task not found: {task_id}") return # 更新任务状态 task.status = "completed" task.completed_at = datetime.utcnow() task.result = result # 更新智能体负载 if task.assigned_to: self.agents[task.assigned_to]['current_workload'] -= task.estimated_duration self.agent_workload[task.assigned_to] = self.agents[task.assigned_to]['current_workload'] logger.info(f"Task {task_id} completed by {task.assigned_to}") def get_agent_tasks(self, agent_id: str) -> List[Task]: """获取智能体的任务""" return [ task for task in self.tasks.values() if task.assigned_to == agent_id and task.status != "completed" ] def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" total_tasks = len(self.tasks) completed_tasks = len([t for t in self.tasks.values() if t.status == "completed"]) pending_tasks = len([t for t in self.tasks.values() if t.status == "pending"]) assigned_tasks = len([t for t in self.tasks.values() if t.status == "assigned"]) return { 'total_tasks': total_tasks, 'completed_tasks': completed_tasks, 'pending_tasks': pending_tasks, 'assigned_tasks': assigned_tasks, 'agent_workload': self.agent_workload.copy() } # 使用示例 async def main(): """主函数""" allocator = TaskAllocator() # 注册智能体 allocator.register_agent("agent1", ["code_generation", "code_review"], max_workload=2.0) allocator.register_agent("agent2", ["code_review", "testing"], max_workload=1.5) allocator.register_agent("agent3", ["code_generation", "testing"], max_workload=2.0) # 添加任务 tasks = [ Task("task1", "Generate User Module", "Generate user authentication module", ["code_generation"], 1.0), Task("task2", "Review User Module", "Review user authentication module", ["code_review"], 0.5), Task("task3", "Generate Product Module", "Generate product management module", ["code_generation"], 1.0), Task("task4", "Test User Module", "Test user authentication module", ["testing"], 0.5), Task("task5", "Review Product Module", "Review product management module", ["code_review"], 0.5), Task("task6", "Test Product Module", "Test product management module", ["testing"], 0.5), ] for task in tasks: allocator.add_task(task) # 分配任务 for task in tasks: allocator.allocate_task(task.id) # 获取统计信息 stats = allocator.get_statistics() print(f"Allocation statistics: {stats}") # 获取智能体任务 for agent_id in ["agent1", "agent2", "agent3"]: agent_tasks = allocator.get_agent_tasks(agent_id) print(f"\nAgent {agent_id} tasks:") for task in agent_tasks: print(f" - {task.name} ({task.estimated_duration}h)") if __name__ == '__main__': asyncio.run(main()) ```> **任务分配特点**: > - 基于能力匹配 > - 考虑负载均衡 > - 动态调整分配 > - 任务状态跟踪 ``` ## 总结 多智能体协作包括: 1. **多智能体协作的基本概念**: 什么是多智能体协作、协作模式 2. **智能体通信机制**: 消息传递、共享知识库 3. **协作任务执行**: 任务分配 通过多智能体协作,Claude Code可以实现更复杂的任务,提高整体效率和效果。 至此,第27"Agentic AI 核心技术"全部完成。接下来我们将创建第28"Claude Code 架构解析" ```

标记本节教程为已读

记录您的学习进度,方便后续查看。